muxers/yamux: Mitigation of unnecessary stream drops#3071
muxers/yamux: Mitigation of unnecessary stream drops#3071mergify[bot] merged 4 commits intomasterfrom
Conversation
rkuhn
left a comment
There was a problem hiding this comment.
Apart from the nitpick it looks good to me, thanks!
| return Poll::Ready(Ok(stream)); | ||
| } | ||
|
|
||
| self.inbound_stream_waker = Some(cx.waker().clone()); |
There was a problem hiding this comment.
The poll contract says that a waker needs to be registered in case Poll::Pending is returned. While perhaps this unconditional registration may be legal, I think it still violates expectations.
There was a problem hiding this comment.
This registration is not unconditional. It is conditional on the list being empty! The list no longer being empty is IMO a valid reason to wake the task that last polled here.
There was a problem hiding this comment.
Ah, are you saying that self.poll_inner will return Poll::Pending? That would be fine, then.
There was a problem hiding this comment.
OTOH: even if the inner stream does return Poll::Pending, it will have registered that same waker already, and it will thus wake it when new streams become available. What am I missing?
In general, the Rust async rules are thus: if you create a Poll::Pending, then you’re responsible for waking a waker. Otherwise you just pass along poll results.
There was a problem hiding this comment.
Ah, are you saying that
self.poll_innerwill returnPoll::Pending? That would be fine, then.
Not quite.
This poll function essentially should we polled again in two different scenarios:
- If the general poll-function pushed a new stream to the buffer.
- If the socket has more bytes to read (that is
poll_inner)
Registering this waker here isn't strictly necessary because we poll the StreamMuxer in swarm::Connection in a loop anyway. But I think it is more correct to still do this here because it showcases that there are two conditions on which the task should be polled again.
OTOH: even if the inner stream does return
Poll::Pending, it will have registered that same waker already, and it will thus wake it when new streams become available. What am I missing?
Like I said, it probably works without too because we will always be implicitly woken again, even if the task that calls the general poll consumes everything from the inner socket already and pushed them to the buffer.
There was a problem hiding this comment.
But I think it is more correct to still do this here because it showcases that there are two conditions on which the task should be polled again.
I agree with this. I don't think the inner working of impl StreamMuxer for Yamux should make assumptions on how it is called in Connection.
There was a problem hiding this comment.
Making things fail due to a missed wake-up would require polling this task through Arc<Mutex<...>>, which I hope nobody would ever consider doing, so I agree that this is harmless.
This discussion is yet another spotlight on how difficult it is to correctly reason about poll functions. This is exacerbated within libp2p by extending the concept — like is being done here — in non-trivial ways. If there actually was some surrounding task that did care about being woken for two different reasons, it would have to manufacture its own wakers (like FuturesUnordered does). But even that usage would be broken because the Waker destined for poll_inbound may be passed to poll_inner as well, overwriting a previously registered Waker that was destined for that purpose.
While debugging wake-up loops is painful, it is orders of magnitude easier than debugging missed wake-ups. May I suggest that we adhere to the policy that each async object offers exactly one poll function, with Future semantics, that drives exactly (i.e. only and completely) the state machine of that object? Interrogating the state machine should not have poll semantics, because that can lead to confusing behaviour and bugs.
The following pattern is what I suggest:
if let Poll::Ready(x) = connection.poll(cx) {
...
return x;
}
for sub in connection.inbound_streams.drain(..) {
...
}There was a problem hiding this comment.
May I suggest that we adhere to the policy that each async object offers exactly one
pollfunction, with Future semantics, that drives exactly (i.e. only and completely) the state machine of that object? Interrogating the state machine should not havepollsemantics, because that can lead to confusing behaviour and bugs.
The design of this trait was inspired by the AsyncWrite design which also has multiple poll_ functions that users need to drive. Sink is similar.
One problem with a single "poll" function design is that it puts more burden on implementations. For example, opening a new stream is not instantaneous, it may require negotiation of new credits with the other party. As such, a "new_outbound" function can only give you an ID or some other kind of handle for a new stream. This means every implementation needs to implement some kind of "stream ID" management. In contrast to that a poll_new_outbound function can just return Pending until the new stream is ready to be used.
There was a problem hiding this comment.
I never found Sink usable for anything I had to do, so its design carries little weight in my opinion.
Offering multiple “poll” functions to poll the same underlying thing has severe issues, as I argued above — and which you so far have not commented on. The example of a “new_outbound” function boils down to the choice of polling the machinery until the new connection is ready, ignoring everything else that happens in the meantime. This already requires the machinery to aggregate its output and let the poller inspect it, for which there is no reason a priori to offer a poll-shaped API. In particular, no Context is necessary to ask whether events have been emitted, which removes one prolific source of confusion inherent to Rust’s Task design.
So my solution to the new_outbound problem would be to offer a front-end Future that polls until the new connection is ready and leaves all other side-effects uninspected, to be dealt with by the caller afterwards.
There was a problem hiding this comment.
So my solution to the
new_outboundproblem would be to offer a front-end Future that polls until the new connection is ready and leaves all other side-effects uninspected, to be dealt with by the caller afterwards.
This is basically the poll_ready& start_send API of Sink then, yes?
Offering multiple “poll” functions to poll the same underlying thing has severe issues, as I argued above — and which you so far have not commented on.
poll_outbound is not the only issue. The problem of having to buffer streams is mostly because yamux doesn't allow us to backpressure the number of streams. The QUIC muxer on the other hand allows us to make progress on the connection itself without necessarily accepting new inbound streams. I am happy to change the API for something better but so far I've not found a solution where the caller (swarm::Connection) can explicitly signal to the muxer that it is now able to take more inbound streams.
We could move away from poll_inbound by having just a pop_inbound function. This however then requires more documentation on when the caller should call this function again if it ever returns None. At that stage, we are just re-inventing the wheel when we could also be using Poll and automatically wake the task when we know that there are new inbound streams available.
| this.inbound_stream_buffer.push_back(inbound_stream); | ||
|
|
||
| if let Some(waker) = this.inbound_stream_waker.take() { | ||
| waker.wake() |
There was a problem hiding this comment.
The discussion above makes me think: is this muxer polled from multiple tasks? If it is, then poll_inner will probably switch out wakers all the time, making it non-deterministic which caller will eventually be woken. If not, then this extra wakeup makes little sense to me.
There was a problem hiding this comment.
At least in rust-libp2p production code, it is only polled from a single task but it is a public interface so there may be other consumers, #2952 for example.
|
Thanks @thomaseizinger for providing the hotfix. I will take a deeper look. In the meantime @rkuhn can you confirm that this improves or resolves the issue you reported in #3041? |
This comment was marked as resolved.
This comment was marked as resolved.
rkuhn
left a comment
There was a problem hiding this comment.
Sorry for the long delay! I have now tested this branch (patched back to 0.49 in top-level Cargo.toml for drop-in replacement) with the ipfs-embed test suite and it works nicely! I don’t need to increase max_buffered_inbound_streams, increasing max_negotiating_inbound_streams suffices, also when hammering bitswap with 1800 Wants at once.
That is great to hear! I am inclined to remove the configuration knob again then. Less config surface is better IMO. |
3af6234 to
ca139c1
Compare
This comment was marked as resolved.
This comment was marked as resolved.
ca139c1 to
bfc9c4d
Compare
|
@mxinden I've rebased this PR onto the Let me know if you need any other changes. I'd suggest we do the release first and then merge this PR! |
|
@kpp @melekes @divagant-martian Would you mind testing this PR on one of your networks? Thank you 🙏 |
|
will do, sounds good if we run over the weekend and report back on monday (Tue in Australia)? |
Yeah that is totally fine! |
I would prefer patch releases to go through our pull request workflow as well. I created branch Once that is merged I will cut the patch release. We can then either merge branch |
mxinden
left a comment
There was a problem hiding this comment.
For the record, this fix looks good to me. Though see comment above before we can merge here.
|
We didn't see any notable difference, for better or worse, between v0.49.0 and this PR. Hope this helps 🤷 |
Can do but I also would like to understand what the benefit is? Are you trying to partly use GitFlow here? Renaming my branch to
There will be merge conflicts, which I already resolved in this PR as part of the last patch. |
|
See #3121. |
I think I misunderstood your comment in #3071 (comment). No benefit from my proposed workflow.
Got it. Thanks. I will merge #3121 and cut a release. Then we can merge this pull request into |
Description
A busy network connection can easily fill the internal buffer. We utilise
wake_by_refso we can returnPoll::Pendingwithout breaking thepoll-function contract. This gives callers a chance to callpoll_inboundand empty the buffer.Fixes #3041.
Notes
cc @rkuhn
Links to any relevant issues
max_negotiating_inbound_streamssetting #3041StreamMuxer#2861Open Questions
Change checklist
I have added tests that prove my fix is effective or that my feature works